//! Module containing implementation of the pivot operation.
//!
//! Polars lazy does not implement a pivot because it is impossible to know the schema without
//! materializing the whole dataset. This makes a pivot quite a terrible operation for performant
//! workflows. An optimization can never be pushed down past a pivot.
//!
//! We can do a pivot on an eager `DataFrame` as that is already materialized. The code for the
//! pivot is here, because we want to be able to pass expressions to the pivot operation.
//!

use polars_core::frame::group_by::expr::PhysicalAggExpr;
use polars_core::prelude::*;
use polars_ops::pivot::PivotAgg;

use crate::physical_plan::exotic::{contains_column_refs, prepare_expression_for_context};
use crate::prelude::*;

pub struct PivotExpr(Expr);

impl PivotExpr {
    pub fn from_expr(expr: Expr) -> Self {
        PivotExpr(expr)
    }
}

impl PhysicalAggExpr for PivotExpr {
    fn evaluate_on_groups(&self, df: &DataFrame, groups: &GroupPositions) -> PolarsResult<Series> {
        let state = ExecutionState::new();
        let dtype = df.get_columns()[0].dtype();
        let phys_expr = prepare_expression_for_context(
            PlSmallStr::EMPTY,
            &self.0,
            dtype,
            Context::Aggregation,
        )?;
        phys_expr
            .evaluate_on_groups(df, groups, &state)
            .map(|mut ac| ac.aggregated().take_materialized_series())
    }

    fn root_name(&self) -> PolarsResult<&PlSmallStr> {
        Ok(PlSmallStr::EMPTY_REF)
    }
}

pub fn pivot<I0, I1, I2, S0, S1, S2>(
    df: &DataFrame,
    on: I0,
    index: Option<I1>,
    values: Option<I2>,
    sort_columns: bool,
    agg_expr: Option<Expr>,
    // used as separator/delimiter in generated column names.
    separator: Option<&str>,
) -> PolarsResult<DataFrame>
where
    I0: IntoIterator<Item = S0>,
    I1: IntoIterator<Item = S1>,
    I2: IntoIterator<Item = S2>,
    S0: Into<PlSmallStr>,
    S1: Into<PlSmallStr>,
    S2: Into<PlSmallStr>,
{
    // we are strict:
    // agg_expr can only access data as generated by the pivot operation through pl.element()
    if agg_expr.as_ref().is_some_and(contains_column_refs) {
        polars_bail!(InvalidOperation: "explicit column references are not allowed in aggregate_function");
    }

    let agg_expr = agg_expr.map(|ae| PivotAgg(Arc::new(PivotExpr(ae))));
    polars_ops::pivot::pivot(df, on, index, values, sort_columns, agg_expr, separator)
}

pub fn pivot_stable<I0, I1, I2, S0, S1, S2>(
    df: &DataFrame,
    on: I0,
    index: Option<I1>,
    values: Option<I2>,
    sort_columns: bool,
    agg_expr: Option<Expr>,
    // used as separator/delimiter in generated column names.
    separator: Option<&str>,
) -> PolarsResult<DataFrame>
where
    I0: IntoIterator<Item = S0>,
    I1: IntoIterator<Item = S1>,
    I2: IntoIterator<Item = S2>,
    S0: Into<PlSmallStr>,
    S1: Into<PlSmallStr>,
    S2: Into<PlSmallStr>,
{
    // we are strict:
    // agg_expr can only access data as generated by the pivot operation through pl.element()
    if agg_expr.as_ref().is_some_and(contains_column_refs) {
        polars_bail!(InvalidOperation: "explicit column references are not allowed in aggregate_function");
    }

    let agg_expr = agg_expr.map(|ae| PivotAgg(Arc::new(PivotExpr(ae))));
    polars_ops::pivot::pivot_stable(df, on, index, values, sort_columns, agg_expr, separator)
}
